package flink.eureka.connector

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}

/**
  *
  * @author eureka.wh
  * @since 2019-06-11 12:46
  */
object Kafka2Kafka {

  def main(args: Array[String]): Unit = {

    println("哈哈哈~~~Kafka2Kafka")

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // checkpoint常用设置参数
    env.enableCheckpointing(4000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointTimeout(10000)
//    env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    import org.apache.flink.api.scala._

    // 启动weihua01的生产者
    val topic = "weihua01"
    val properties = new Properties()

    properties.setProperty("bootstrap.servers", "cm01:9092")
    properties.setProperty("group.id", "kkk")

    val data = env.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(), properties))

    // 启动weihua02的消费者
    val topic2 = "h1"
    val properties2 = new Properties()
    properties2.setProperty("bootstrap.servers", "cm01:9092")

    val kafkaSink = new FlinkKafkaProducer[String](topic2,
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),
      properties2)

    data.addSink(kafkaSink)

    data.print()

    env.execute("Kafka2Kafka")
  }
}
